import json

from kafka import KafkaProducer

# 1、创建生产者
producer = KafkaProducer(bootstrap_servers="master:9092")

# 2、读取文件
with open("../data/students.json", mode="r", encoding="utf-8") as f:
    lines = [line.strip() for line in f.readlines()]

# kafka-topics.sh --bootstrap-server master:9092 --create --topic students_partition_age --partitions 3
# 3、循环将数据写入kafka
for line in lines:
    # 将相同的年龄发送到同一个分区中
    age = json.loads(line)["age"]
    partition = age % 3

    producer.send(topic="students_partition_age", value=line.encode("utf-8"),partition=partition)
    producer.flush()

producer.close()
